package com.tt.notify.consumer.listener;

import java.nio.charset.Charset;
import java.util.List;

import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils;

import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import com.alibaba.rocketmq.common.message.MessageExt;
import com.alibaba.rocketmq.common.message.MessageQueue;
import com.tt.notify.common.exception.MQException;
import com.tt.notify.consumer.vo.MessageVO;

import lombok.Getter;

/**
 * 封装通用逻辑，监听wrapper，用于处理共通业务，并转发消息到业务监听中。
 * @author  liuhaihui
 * @date    2017年7月13日 下午7:21:19
 * @version
 */
@Service
public class MQMessageWrapper implements MessageListenerConcurrently {

	@Getter
	@Autowired
	private MQMessageListener rocketMqMessageListener;

	/**
	 * 消费消息
	 * @param list
	 * @param consumeConcurrentlyContext
	 * @return
	 */
	@Override
	public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext context) {
		if(this.rocketMqMessageListener == null){
            throw new MQException("请定义一个RocketMqMessageListener的实现类|please define a rocketMqMessageListener for consumer!");
        }
		if(CollectionUtils.isEmpty(list)){
			MessageQueue messageQueue = context.getMessageQueue();
			if(messageQueue == null){
				throw new MQException("没有获取到任何数据!");
			}
			throw new MQException(String.format("没有获取到任何数据! Topic:%s, brokerName:%s, queueId:%d", 
					messageQueue.getTopic(), messageQueue.getBrokerName(), messageQueue.getQueueId()));
		}
		//[1]使用下面这段代码的前提条件是：defaultMQPushConsumer.setConsumeMessageBatchMaxSize(=1); @see: MQConsumer.java
		MessageExt messageExt = list.get(0);
		if(messageExt != null && rocketMqMessageListener.getTopic().getName().equals(messageExt.getTopic())){
				MessageVO messageVO = new MessageVO();
				messageVO.setTopic(messageExt.getTopic());
				messageVO.setTags(messageExt.getTags());
				messageVO.setKeys(messageExt.getKeys());
				messageVO.setContent(StringUtils.toEncodedString(messageExt.getBody(), Charset.defaultCharset()));
				messageVO.setBornTimestamp(messageExt.getBornTimestamp());
				messageVO.setMsgId(messageExt.getMsgId());
				messageVO.setQueueId(messageExt.getQueueId());
				messageVO.setQueueOffset(messageExt.getQueueOffset());
				messageVO.setReconsumeTimes(messageExt.getReconsumeTimes());
			if (rocketMqMessageListener.onMessage(messageVO)) {
				return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
			}
		}
		return ConsumeConcurrentlyStatus.RECONSUME_LATER;
		
		
		//下面的代码于[ 2017年8月7日11:12:55 ]作废！
		//[2]使用下面这段代码的前提条件是：defaultMQPushConsumer.setConsumeMessageBatchMaxSize(>1); @see: MQConsumer.java
		
		//List<MessageVO> messageVOList = new ArrayList<MessageVO>();
		//for (MessageExt messageExt : list) {
		//	if(rocketMqMessageListener.getTopic().equals(messageExt.getTopic())){
		//		MessageVO messageVO = new MessageVO();
		//			messageVO.setTopic(messageExt.getTopic());
		//			messageVO.setTags(messageExt.getTags());
		//			messageVO.setKeys(messageExt.getKeys());
		//			messageVO.setContent(StringUtils.toEncodedString(messageExt.getBody(), Charset.defaultCharset()));
		//			messageVO.setBornTimestamp(messageExt.getBornTimestamp());
		//			messageVO.setMsgId(messageExt.getMsgId());
		//			messageVO.setQueueId(messageExt.getQueueId());
		//			messageVO.setQueueOffset(messageExt.getQueueOffset());
		//			messageVO.setReconsumeTimes(messageExt.getReconsumeTimes());
		//		messageVOList.add(messageVO);
		//	}
		//}
		//if(!messageVOList.isEmpty()){
		//	if (rocketMqMessageListener.onMessage(messageVOList)) {
		//		return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
		//	}
		//}
		//return ConsumeConcurrentlyStatus.RECONSUME_LATER;
	}
}
